今天來試寫一個 DAG 吧,我們來講講 DAG 的架構,這是我從官方文件一路摸索上來的,如果有更好的用法,歡迎交流 ~
DAG 可以分成3大塊 :
我看過用 with DAG() 跟 DAG() 方式建立的,我目前覺得 dag decorator 最好用
我看過用 operator、用 sensor 的,目前我覺得用 task decorator 最好用
import pendulum
import datetime
from airflow.decorators import dag
from airflow.decorators import task
# dag decorator
@dag(
dag_id = "test_dag",
schedule = "15 9 * * *",
start_date = pendulum.datetime(2025, 4, 23, tz = "Asia/Taipei"),
catchup = False,
dagrun_timeout = datetime.timedelta(minutes = 30), # currently set the expiry time for 30 mins
default_args = {
"email" : ["youremail@gmail.com"],
"email_on_failure" : True,
"email_on_retry" : True,
"owner": "Livia"
},
tags = ["sales_ETL"]
)
def test_dag() : # 主要 function
# task decorator
@task
def remove_nan_data() :
return "clean_data done"
@task
def change_type():
return "change_type done"
# materialization for the task
remove_nan_data = remove_nan_data()
change_type = change_type()
# set the dependency
remove_nan_data >> change_type
dag = test_dag()
做好 DAG 後,存成 .py 檔,放入你的 airflow 專案資料夾,同 docker-compose.yaml 檔那層有 dags 資料夾,把你的 DAG .py 檔放入這個資料夾後,等你架好的 airflow 偵測到檔案後就會加到介面上可以觀測了。
參考資料 - airlfow Dags : https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html